import os
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from cvss import CVSS2,CVSS3
from itertools import product,combinations
import math
import re
nvddata=pd.read_csv('/home/Bushu/Documents/Enviroment/allen/data/nvdwithmissng.csv',header="infer")
#nvddata=nvddata[nvddata.cvss_scorev3 != 'nov3']
nvddata=nvddata.iloc[:,[0,1,3,6,7,]]
targetdata=pd.read_csv('/home/Bushu/Documents/Enviroment/allen/data/mainscan/targetresult.csv')
threatdata=pd.read_csv('/home/Bushu/Documents/Enviroment/allen/data/mainscan/threatresult.csv')
cwe_patterns=pd.read_csv('/home/Bushu/Documents/Enviroment/allen/data/cwelist.csv')
# data=pd.read_csv('/home/Bushu/Documents/Final Paper/data/nvd2021e.csv',header="infer")
#https://www.cvedetails.com/vulnerability-list/year-2007/month-1/January.html
len(nvddata[nvddata.cvss_scorev3 != "nov3"])
152987
targetdata.fillna(0, inplace=True)
# nvddata.cvss_scorev3.replace(to_replace=["CVSS:3.1","CVSS:3.0"],value="nov3",inplace=True)
cwe_patterns=cwe_patterns.iloc[:,[0,6,21]]
nvddata.head(10)
| cve_number | cvss_scorev3 | cwe_number | description | exploitability | |
|---|---|---|---|---|---|
| 0 | CVE-1999-0001 | nov3 | unidentified | ip_input.c in BSD-derived TCP/IP implementatio... | noex |
| 1 | CVE-1999-0002 | nov3 | unidentified | Buffer overflow in NFS mountd gives root acces... | noex |
| 2 | CVE-1999-0003 | nov3 | unidentified | Execute commands as root via buffer overflow i... | noex |
| 3 | CVE-1999-0004 | nov3 | unidentified | MIME buffer overflow in email clients, e.g. So... | noex |
| 4 | CVE-1999-0005 | nov3 | unidentified | Arbitrary command execution via IMAP buffer ov... | noex |
| 5 | CVE-1999-0006 | nov3 | unidentified | Buffer overflow in POP servers based on BSD/Qu... | noex |
| 6 | CVE-1999-0008 | nov3 | unidentified | Buffer overflow in NIS+, in Sun's rpc.nisd pro... | noex |
| 7 | CVE-1999-0009 | nov3 | unidentified | Inverse query buffer overflow in BIND 4.9 and ... | noex |
| 8 | CVE-1999-0010 | nov3 | unidentified | Denial of Service vulnerability in BIND 8 Rele... | noex |
| 9 | CVE-1999-0011 | nov3 | unidentified | Denial of Service vulnerabilities in BIND 4.9 ... | noex |
def Vulenerablity_connector(vec):
if vec != "nov3":
v3=vec.split('/')
v3_dic={ x.split(':')[0]:x.split(':')[1] for x in v3 }
# this values are taken form cvss 3.1 metric score table
# Attack Vector = N=0.85 A=0.62 L=0.55 P=0.22
# Attack Complexity = H=0.77 L=0.44
# Priviledges Required N=0.85 scope changed(L=0.68,H=0.5) or scope not changed(L=0.62,H=0.27)
# scope C is changed and U is not changed
# math. sqrt(x)
av_value={"N":0.85, "A":0.62 ,"L":0.55 ,"P":0.22}
ac_value={"H":0.77, "L":0.44 }
pr_value={ "C":{"L":0.68,"H":0.5, "N":0.85,}, "U":{"L":0.62,"H":0.27, "N":0.85,}}
probablity=(pr_value[v3_dic["S"]][v3_dic["PR"]]/(pr_value[v3_dic["S"]][v3_dic["PR"]]+math.sqrt(av_value[v3_dic["AV"]]*ac_value[v3_dic["AC"]])))
return probablity
return "unknown"
nvddata["Vulnerablity_connector"]=nvddata.cvss_scorev3.map(Vulenerablity_connector)
# this is target version,os and device list
#//////////////////////////////////////////////////////
ver=targetdata.version.unique()
os=targetdata.OSInfo.unique()
os=[x for x in os if x != 0]
dev=targetdata.Device.unique()
dev=[x for x in dev if x != 0]
#/////////////////////////////////////////////////////////
# this is threat version,os and device list
# /////////////////////////////////////////////////////
th_ver=threatdata.version.unique()
th_os=threatdata.OSInfo.unique()
th_os=[x for x in th_os if x != 0]
th_dev=threatdata.Device.unique()
th_dev=[x for x in th_dev if x != 0]
# /////////////////////////////////////////////////////////////////
# This is target session filter
# ///////////////////////////////////////
def test_target_service(description):
if re.search(f'({"|".join(os)})',description):
return True
return False
def test_target_device(description):
if re.search(f'({"|".join(dev)})',description):
return True
return False
def test_target_version(description):
if re.search(f'({"|".join(ver)})',description):
return True
return False
# ////////////////////////////////////////
# This is threat source version filter
# /////////////////////////////////////////////
def test_threat_service(description):
if re.search(f'({"|".join(th_os)})',description):
return True
return False
def test_threat_device(description):
if re.search(f'({"|".join(th_dev)})',description):
return True
return False
def test_threat_version(description):
if re.search(f'({"|".join(th_ver)})',description):
return True
return False
# ///////////////////////////////////////////////////////
#scoring func### Availablity Severity distributioniton
# /////////////////////////////////////////////////////
def cvs3_sc(vector):
score=CVSS3(vector)
return score.scores()[0]
#distribution function
#this catagoraizaitn is based on CVSS3 specification table
# //////////////////////////////////////////////////////
def compare(score):
if score == 0:
return "None"
elif 0.1 <= score <= 3.9:
return "Low"
elif 4<= score <=6.9:
return "Medium"
elif 7<=score<=8.9:
return "High"
elif 9<=score<=10:
return "Critical"
# ///////////////////////////////////////////////////
# Running Filter Map functions for target
# ///////////////////////////////
# please note that order maters here when executing
nvddata['target_possible_v']=nvddata.description.map(test_target_version)
nvddata['target_possible_s']=nvddata.description.map(test_target_service)
nvddata['target_possible_dev']=nvddata.description.map(test_target_device)
# Running Filter Map functions for threat source
# ///////////////////////////////
# please note that order maters here when executing
nvddata['threat_possible_v']=nvddata.description.map(test_threat_version)
nvddata['threat_possible_s']=nvddata.description.map(test_threat_service)
#nvddata['threat_possible_dev']=nvddata.description.map(test_threat_device)
target_cves=nvddata[(nvddata.target_possible_v == True) | (nvddata.target_possible_s == True) | (nvddata.target_possible_dev == True) ]
threat_cves=nvddata[(nvddata.threat_possible_v == True) | (nvddata.threat_possible_s == True)]
target_list_with_nov3=target_cves[target_cves.cvss_scorev3 == "nov3"]
target_list_with_score=target_cves[target_cves.cvss_scorev3 != "nov3"]
threat_list_with_nov3=threat_cves[threat_cves.cvss_scorev3 == "nov3"]
threat_list_with_score=threat_cves[threat_cves.cvss_scorev3 != "nov3"]
len(target_cves)
312
len(threat_cves)
218
#genarated_combination
genarated_combination=list(product(threat_list_with_score.cve_number,target_list_with_score.cve_number))
chained_list=pd.DataFrame({'threat_target':genarated_combination},columns=['threat_target'])
print(f"length of target cve is : {len(target_list_with_score.cve_number)}")
print(f"length of threat cve is : {len(threat_list_with_score.cve_number)}")
length of target cve is : 281 length of threat cve is : 218
# dictionary of connector,score and cwe id
nvd_score_dic=dict(zip(nvddata.cve_number,nvddata.cvss_scorev3))
connector_dic=dict(zip(nvddata['cve_number'],nvddata['Vulnerablity_connector']))
cweid_dic=dict(zip(nvddata['cve_number'],nvddata['cwe_number']))
#calculating combined probablity
def combined_connector(v3):
return connector_dic[v3[0]]*connector_dic[v3[1]]
#calculating chainned score
def combined_chainned_score(v3):
if nvd_score_dic.get(v3[0]):
threat_lis=nvd_score_dic.get(v3[0]).split('/')
if nvd_score_dic.get(v3[1]):
target_lis=nvd_score_dic.get(v3[1]).split('/')
if nvd_score_dic.get(v3[1]) and nvd_score_dic.get(v3[0]):
threat={x.split(':')[0]:x.split(':')[1] for x in threat_lis}
target={x.split(':')[0]:x.split(':')[1] for x in target_lis}
# this values are taken form cvss 3.1 metric score table
# Attack Vector = N=0.85 A=0.62 L=0.55 P=0.22
# confidentiality/Integerity/Availablity H=0.56,L=0.22,N=0
# User Interaction N=0.85 R=0.62
# Attack Complexity = H=0.77 L=0.44
# Priviledges Required N=0.85 scope changed(L=0.68,H=0.5) or scope not changed(L=0.62,H=0.27)
#print(threat)
scope=threat['S']
if threat['S'] != target['S']:
scope="C"
ui=threat['UI']
if threat['UI'] != target['UI']:
ui="N"
ac=threat['AC']
if threat['AC'] != target['AC']:
ac="H"
if threat['C'] == "H" or target['C'] == "H":
conf="H"
elif threat['C'] == "L" or target['C'] == "L" and (threat['C'] != "H" or target['C'] != "H"):
conf="L"
else:
conf="N"
if threat['I'] == "H" or target['I'] == "H":
integ="H"
elif threat['I'] == "L" or target['I'] == "L" and (threat['I'] != "H" or target['I'] != "H"):
integ="L"
else:
integ="N"
if threat['A'] == "H" or target['A'] == "H":
avail="H"
elif threat['A'] == "L" or target['A'] == "L" and (threat['A'] != "H" or target['A'] != "H"):
avail="L"
else:
avail="N"
if threat['PR'] == "N" or target['PR'] == "N":
pr="N"
elif threat['PR'] == "L" or target['PR'] == "L" and (threat['PR'] != "N" or target['PR'] != "N"):
pr="L"
else:
pr="H"
#NALP
if threat['AV'] == "N" or target['AV'] == "N":
av="N"
elif threat['AV'] == "A" or target['AV'] == "A" and (threat['AV'] != "N" or target['AV'] != "N"):
av="A"
elif threat['AV'] == "L" or target['AV'] == "L" and (threat['AV'] != "A" or target['AV'] != "A" or threat['AV'] != "N" or target['AV'] != "N"):
av="L"
else:
av="P"
chained_vector= f"CVSS:3.1/AV:{av}/AC:{ac}/PR:{pr}/UI:{ui}/S:{scope}/C:{conf}/I:{integ}/A:{avail}"
#
return CVSS3(chained_vector).scores()[0]
#return chained_vector
else:
return "unkown"
#checking related weakness from the CWE related pattern list
cwe_patterns.Related_Attack_Patterns.fillna("nothing",inplace=True)
def cwe_related_patterns(h):
if h != "nothing":
return h.strip().split("::")
cwe_patterns['related_list']=cwe_patterns.Related_Attack_Patterns.map(cwe_related_patterns)
pattern_litmus_dic=dict(zip(cwe_patterns.CWE_ID,cwe_patterns.related_list))
def checking_related_pattern(vec):
if cweid_dic.get(vec[0]) or cweid_dic.get(vec[1]):
if cweid_dic.get(vec[0]) in cweid_dic.get(vec[1]) or cweid_dic.get(vec[1]) in cweid_dic.get(vec[0]):
return "probable relation"
return "no information"
return "no information"
chained_list['vulnerablity_connector']=chained_list.threat_target.map(combined_connector)
chained_list['cvssv3_chained_score']=chained_list.threat_target.map(combined_chainned_score)
chained_list['relation_for_chainning']=chained_list.threat_target.map(checking_related_pattern)
def target_source_av(v3):
if nvd_score_dic.get(v3[0]):
threat_lis=nvd_score_dic.get(v3[0]).split('/')
if nvd_score_dic.get(v3[1]):
target_lis=nvd_score_dic.get(v3[1]).split('/')
if nvd_score_dic.get(v3[1]) and nvd_score_dic.get(v3[0]):
threat={x.split(':')[0]:x.split(':')[1] for x in threat_lis}
target={x.split(':')[0]:x.split(':')[1] for x in target_lis}
return f"{threat['AV']}-{target['AV']}"
def filter_source_target_logic(x):
if x.split('-')[0] == "P" or x.split('-')[1] == "P" or x =="A-A":
return False;
return True
chained_list['av_source_target']=chained_list.threat_target.map(target_source_av)
chained_list['chain_log']=chained_list.av_source_target.map(filter_source_target_logic)
# filtering with chainlogic values
chained_list=chained_list[chained_list.chain_log == True]
chained_list.drop('chain_log',1)
print('nothing')
nothing
#simple spliting funcitons
def source_split(x):
return x[0]
def target_split(x):
return x[1]
def compare(score):
if score == 0:
return "None"
elif 0.1 <= score <= 3.9:
return "Low"
elif 4<= score <=6.9:
return "Medium"
elif 7<=score<=8.9:
return "High"
elif 9<=score<=10:
return "Critical"
chained_list['severity']=chained_list.cvssv3_chained_score.map(compare)
chained_list['source_cve']=chained_list.threat_target.map(source_split)
chained_list['target_cve']=chained_list.threat_target.map(target_split)
chained_list.vulnerablity_connector.describe()
count 60756.000000 mean 0.309311 std 0.046867 min 0.089811 25% 0.292780 50% 0.324277 75% 0.338226 max 0.401213 Name: vulnerablity_connector, dtype: float64
chained_list.cvssv3_chained_score.describe()
count 60756.00000 mean 6.91074 std 2.26155 min 0.00000 25% 5.60000 50% 7.30000 75% 8.30000 max 10.00000 Name: cvssv3_chained_score, dtype: float64
plt.figure(figsize=(20,15))
sns.pairplot(chained_list[['vulnerablity_connector', 'cvssv3_chained_score','relation_for_chainning']], hue='relation_for_chainning')
# plt.savefig("vc_cs.png")
<seaborn.axisgrid.PairGrid at 0x7f16af6b3be0>
<Figure size 1440x1080 with 0 Axes>
plt.figure(figsize=(20,15))
sns.pairplot(chained_list[['vulnerablity_connector', 'cvssv3_chained_score','severity']], hue='severity')
/home/Bushu/Documents/Enviroment/allen/lib/python3.9/site-packages/seaborn/distributions.py:306: UserWarning: Dataset has 0 variance; skipping density estimate. warnings.warn(msg, UserWarning)
<seaborn.axisgrid.PairGrid at 0x7f16ad942b80>
<Figure size 1440x1080 with 0 Axes>
chained_list.head(5)
| threat_target | vulnerablity_connector | cvssv3_chained_score | relation_for_chainning | av_source_target | chain_log | severity | source_cve | target_cve | |
|---|---|---|---|---|---|---|---|---|---|
| 0 | (CVE-2001-0288, CVE-2000-0076) | 0.368376 | 7.3 | probable relation | N-L | True | High | CVE-2001-0288 | CVE-2000-0076 |
| 1 | (CVE-2001-0288, CVE-2000-0366) | 0.368376 | 7.3 | probable relation | N-L | True | High | CVE-2001-0288 | CVE-2000-0366 |
| 2 | (CVE-2001-0288, CVE-2000-0367) | 0.368376 | 7.3 | probable relation | N-L | True | High | CVE-2001-0288 | CVE-2000-0367 |
| 3 | (CVE-2001-0288, CVE-2000-1135) | 0.368376 | 7.3 | probable relation | N-L | True | High | CVE-2001-0288 | CVE-2000-1135 |
| 4 | (CVE-2001-0288, CVE-2000-1136) | 0.368376 | 7.3 | probable relation | N-L | True | High | CVE-2001-0288 | CVE-2000-1136 |
len(chained_list[chained_list.relation_for_chainning == "probable relation"].source_cve.unique())
198
tweakness=chained_list.groupby(['target_cve'])['severity'].value_counts().reset_index(name='counts')
tweakness.head(10)
| target_cve | severity | counts | |
|---|---|---|---|
| 0 | CVE-2000-0076 | High | 87 |
| 1 | CVE-2000-0076 | Medium | 60 |
| 2 | CVE-2000-0076 | Low | 42 |
| 3 | CVE-2000-0076 | Critical | 28 |
| 4 | CVE-2000-0366 | High | 87 |
| 5 | CVE-2000-0366 | Medium | 60 |
| 6 | CVE-2000-0366 | Low | 42 |
| 7 | CVE-2000-0366 | Critical | 28 |
| 8 | CVE-2000-0367 | High | 87 |
| 9 | CVE-2000-0367 | Low | 38 |
# plt.figure(figsize=(20,15))
# sns.pairplot(tweakness[['target_cve','counts','severity']], hue='severity')
len(tweakness.target_cve.unique())
274
for k in tweakness.target_cve.unique():
stweal=tweakness[tweakness.target_cve == k]
plt.figure(figsize=(9,7))
tprs=sns.barplot(y ='counts',x='target_cve', hue = "severity", data =stweal)
plt.title(f"{k} weakness cvss score discription", fontsize=24)
########################Sveierity Disribtuion########################################################################
#the following code snippet below is taken from stackoverflow,find the link below
# https://stackoverflow.com/questions/62002434/how-to-add-data-labels-to-seaborn-barplot
#################################################################################################
for p in tprs.patches:
tprs.annotate("%.0f" % p.get_height(), (p.get_x() + p.get_width() / 2., p.get_height()),
ha='center', va='center', fontsize=12, color='black', xytext=(0, 5),
textcoords='offset points')
################################################################################################
<ipython-input-53-4bca1ccabbea>:3: RuntimeWarning: More than 20 figures have been opened. Figures created through the pyplot interface (`matplotlib.pyplot.figure`) are retained until explicitly closed and may consume too much memory. (To control this warning, see the rcParam `figure.max_open_warning`). plt.figure(figsize=(9,7))
count 896.000000 mean 67.808036 std 42.359422 min 4.000000 25% 35.000000 50% 56.000000 75% 96.000000 max 296.000000 Name: counts, dtype: float64
# len(chained_list[chained_list.cvssv3_chained_score >= 4 ].source_cve.unique())
plt.figure(figsize=(16,9))
sns.displot(chained_list.vulnerablity_connector)
<seaborn.axisgrid.FacetGrid at 0x7f16abe062b0>
<Figure size 1152x648 with 0 Axes>
plt.figure(figsize=(16,9))
sns.kdeplot(data=chained_list, x='vulnerablity_connector')
<AxesSubplot:xlabel='vulnerablity_connector', ylabel='Density'>
# len(chained_list[].target_cve.unique())
len(chained_list)
60756
summary_chain=chained_list[(chained_list.cvssv3_chained_score > 4) & (chained_list.relation_for_chainning == "probable relation")]
#summary_chain=summary_chain.groupby('target_cve')
# result=summary_chain['severity'].value_counts().reset_index(name='score_counts')
# len(result.target_cve.unique())
len(target_list_with_score
.cve_number)
281
# plt.figure(figsize=(16,9))
# prs=sns.barplot(y ='score_counts',x='target_cve', hue = "severity", data =result)
# plt.title("Privledges Required and Scope", fontsize=24)
# ########################Sveierity Disribtuion########################################################################
# #the following code snippet below is taken from stackoverflow,find the link below
# # https://stackoverflow.com/questions/62002434/how-to-add-data-labels-to-seaborn-barplot
# #################################################################################################
# for p in prs.patches:
# prs.annotate("%.0f" % p.get_height(), (p.get_x() + p.get_width() / 2., p.get_height()),
# ha='center', va='center', fontsize=12, color='black', xytext=(0, 5),
# textcoords='offset points')
# ################################################################################################
# # plt.savefig("countsPRScope.png")
# plt.figure(figsize=(30,15))
# tcs=sns.countplot(x='target_cve',hue = "severity", data =summary_chain)
# plt.title("Target CVE count catagorized with seveirity", fontsize=24)
# tcs.set_xticklabels(tcs.get_xticklabels(), rotation=45)
# ########################Sveierity Disribtuion########################################################################
# #the following code snippet below is taken from stackoverflow,find the link below
# # https://stackoverflow.com/questions/62002434/how-to-add-data-labels-to-seaborn-barplot
# #################################################################################################
# for p in tcs.patches:
# tcs.annotate("%.0f" % p.get_height(), (p.get_x() + p.get_width() / 2., p.get_height()),
# ha='center', va='center', fontsize=12, color='black', xytext=(0, 5),
# textcoords='offset points')
# ################################################################################################
# # # plt.savefig("countsPRScope.png")
--------------------------------------------------------------------------- NameError Traceback (most recent call last) <ipython-input-35-637c8adad608> in <module> 14 # # # plt.savefig("countsPRScope.png") 15 ---> 16 something NameError: name 'something' is not defined
plt.figure(figsize=(9,7))
sns.heatmap(chained_list.loc[:,['vulnerablity_connector','cvssv3_chained_score']].corr(), annot=True)
<AxesSubplot:>
#maximum vulnerablity connectivity can assume
def max_connectivity(x):
return x/(x+math.sqrt(0.2*0.44))
gt=np.arange(0.27, 0.85, 0.01)
plt.plot(gt, max_connectivity(gt),'r--')
[<matplotlib.lines.Line2D at 0x7f26d274efd0>]
#minimum vulnerablity connectitivity can assume
def min_connectivity(x):
return x/(x+math.sqrt(0.77*0.85))
gt=np.arange(0.27, 0.85, 0.01)
plt.plot(gt, min_connectivity(gt),'r--')
[<matplotlib.lines.Line2D at 0x7f26d26b89d0>]
# Max connector value
print(0.85/(0.85+math.sqrt(0.2*0.44))) # max connectivity value
0.7412911764204058
#min cinnector value
print(0.27/(0.27+math.sqrt(0.7*0.85))) # min connectivity value
0.25927572567958324
# chained_list.to_csv("finalproduct.csv",index=False,encoding="utf-8")